package org.databandtech.mysql2redis.sink;

import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.databandtech.mysql2redis.util.RedisUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.gson.Gson;
import redis.clients.jedis.Jedis;

public class SinkToRedis extends RichSinkFunction<Tuple5<String,Integer,String,String,String>> {

	private static final long serialVersionUID = 1L;

	private static final Logger logger = LoggerFactory.getLogger(SinkToRedis.class);

    private Jedis jedis = null;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        jedis = RedisUtil.getResource();
        logger.info("==========> open jedis: " + jedis.toString());
    }

    @Override
    public void invoke(Tuple5<String,Integer,String,String,String> value) {
        jedis.set(value.f0, new Gson().toJson(value));
    	//jedis.set(value.f0, value.f2);
        logger.info("==========> set value to redis: " + value.toString());
    }

    @Override
    public void close() throws Exception {
        super.close();
        if (null != jedis) {
            logger.info("==========> close jedis: " + jedis.toString());
            jedis.close();
        }
    }
}
